package vip.mate.message.controller;
/**
 *                     .::::.
 *                   .::::::::.
 *                  :::::::::::    佛主保佑、永无Bug
 *              ..:::::::::::'
 *            '::::::::::::'
 *              .::::::::::
 *         '::::::::::::::..
 *              ..::::::::::::.
 *            ``::::::::::::::::
 *             ::::``:::::::::'        .:::.
 *            ::::'   ':::::'       .::::::::.
 *          .::::'      ::::     .:::::::'::::.
 *         .:::'       :::::  .:::::::::' ':::::.
 *        .::'        :::::.:::::::::'      ':::::.
 *       .::'         ::::::::::::::'         ``::::.
 *   ...:::           ::::::::::::'              ``::.
 *  ```` ':.          ':::::::::'                  ::::..
 *                     '.:::::'                    ':'````..
 */

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import vip.mate.core.rocketmq.config.MqConfig;
import vip.mate.core.rocketmq.entity.Users;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * RocketMQ 消息生产者测试Controller
 *
 * @Date 2021/4/25 3:55 下午
 */
@Slf4j
@RestController
@RequestMapping("/rocketmq/producer")
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * basic：MQ 发送字符串消息
     *
     * @param msg : 发送内容
     * @return java.lang.Object
     * @throws
     * @Date 2021/4/25 3:58 下午
     */
    @RequestMapping("/sendString")
    public Object sendString(@RequestParam(value = "msg", required = true, defaultValue = "你好，RocketMQ") String msg) {
        // 同步发送
        SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICSTRING, msg);
        log.info("MQ发送结果：{}", sendResult);
        return sendResult;
    }

    /**
     * basic：发送User对象
     *
     * @return java.lang.Object
     * @throws
     * @Date 2021/4/25 3:59 下午
     */
    @RequestMapping("/testSendUserModel")
    public Object testSendUserModel() {
        // 构造Users对象
        Users users = Users.builder().id(1).name("sunjs").registerTime(DateUtil.date()).build();
        // 同步发送
        SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICUSERS, users);
        log.info("MQ发送结果：{}", sendResult);
        return sendResult;
    }

    /**
     * consumemodel：测试有序消息队列
     *
     * @return java.lang.Object
     * @throws
     * @Date 2021/4/25 5:07 下午
     */
    @RequestMapping("/testOrderly")
    public Object testOrderly() {
        // 循环构造对象
        for (int i = 1; i <= 30; i++) {
            // 将循环下标当成用户id
            Users users = Users.builder().id(i).name(RandomUtil.randomNumbers(4)).build();
            // 同步发送
            SendResult sendResult = rocketMQTemplate.syncSendOrderly(MqConfig.Topic.TOPICUSERSORDERLY, users, "hashkey");
            // 屏蔽日志输出，方便监控消费者日志输出
            log.info("MQ发送结果：{}", sendResult);
        }
        return "有序批量发送完成";
    }

    /**
     * consumemodel：测试无序消息队列
     *
     * @return java.lang.Object
     * @throws
     * @Date 2021/4/25 5:10 下午
     */
    @RequestMapping("/testConcurrently")
    public Object testConcurrently() {
        // 循环构造对象
        for (int i = 1; i <= 30; i++) {
            // 将循环下标当成用户id
            Users users = Users.builder().id(i).build();
            // 同步发送
            SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICUSERSCONCURRENTLY, users);
            // 屏蔽日志输出，方便监控消费者日志输出
            log.info("MQ发送结果：{}", sendResult);
        }
        return "无序批量发送完成";
    }

    /**
     * RocketMQ 发送带有tag的消息
     *
     * @return java.lang.Object
     * @throws
     * @Date 2021/4/25 5:19 下午
     */
    @RequestMapping("/tag")
    public Object tag() {
        // 发送tag1消息
        SendResult sendResult1 = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICTAG + ":tag1", "这是一条字符串消息，给tag1的");
        log.info("tag1发送结果：{}", sendResult1);
        // 发送tag2消息
        SendResult sendResult2 = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICTAG + ":tag2", "这是一条字符串消息，给tag2的");
        log.info("tag2发送结果：{}", sendResult2);
        return "发送完成";
    }

    /**
     * broadcasting 测试广播发送消息
     * 所有订阅该Topic的消费者都会收到通知
     *
     * @return java.lang.Object
     * @throws
     * @Date 2021/4/25 6:23 下午
     */
    @RequestMapping("/sendBroadcasting")
    public Object sendBroadcasting() {
        SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICBROADCASTING, "<广播消息>");
        return sendResult;
    }

    /**
     * 其他消息语法
     *
     * @return java.lang.Object
     * @throws
     * @Date 2021/4/25 5:46 下午
     */
    @RequestMapping("/other")
    public Object other() {
        // 发送单向消息
        // 消息不可靠，性能高，只负责往服务器发送一条消息，不会重试也不关心是否发送成功
        // 此方式发送消息的过程耗时非常短，一般在微秒级别，无返回值
        // 不建议使用
        // rocketMQTemplate.sendOneWay(MqConfig.Topic.TOPICOTHER, "这是一条单向消息XXXXXXXX");


        // 延迟消息
        // 预设值的延迟时间间隔为：1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h (不允许自定义)
        // 如果我们要延迟30秒，则设置为4即可
        // 设置延迟10秒，解释：2021-04-25 18:06:30 发送成功，那么消费是在 2021-04-25 18:06:40
        // 第三个参数3000表示：发送超时时间，默认好像是3秒
        // Message<String> message = MessageBuilder.withPayload("这是延迟消息内容").build();
        // SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICOTHER, message, 3000, 3);
        // log.info("延迟消息发送结果：{}", sendResult);


        // 异步消息
        // rocketMQTemplate.asyncSend(MqConfig.Topic.TOPICOTHER, "这是异步消息", new SendCallback() {
        //     @Override
        //     public void onSuccess(SendResult sendResult) {
        //         log.info("异步消息发送成功：{}", sendResult);
        //     }
        //     @Override
        //     public void onException(Throwable e) {
        //         log.info("异步消息发送失败：{}", e.getMessage());
        //     }
        // });


        // 发送带有自定义key值的消息（在RocketMQ-console管理后台中可以根据这个key值进行查询）
        // 不过使用messageId也是可以的，建议每次发送完成后和消费者接收后 都打印出mq消息的必要信息，方便查询信息
        // Message<String> message = MessageBuilder.withPayload("带有key值的消息").setHeader(MessageConst.PROPERTY_KEYS, "123456789").build();
        // SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICOTHER, message);
        // log.info("带有key值消息发送结果：{}", sendResult);

        return "发送完成";
    }

    /**
     * 测试发送事务消息
     *
     * @param type : rollback：表示业务处理出现异常，直接回滚MQ消息；
     *             submitMqFail：表示业务处理事务刚提交处理完成，在发送确认MQ消息之前系统挂了；
     *             rewardCouponFail：表示发券过程中挂了，事务还未提交；
     *             其他字符：业务处理完成，通知MQ成功
     * @return java.lang.Object
     * @throws
     * @Date 2021/4/25 6:43 下午
     */
    @RequestMapping("/sendTransaction")
    public Object sendTransaction(String type) {
        // 所谓事务消息应该理解成 MQ的事务，而不是开发人员所理解的事务，给你数据库进行回滚之类的。这里的事务是保证消息能正常发送和消费，是这个事务
        // 发送事务MQ，应该先发送MQ消息，再进行业务处理。
        // 发送半消息

        // 创建个用户对象，带到事务处理中
        Users users = Users.builder().build();
        users.setId(1);
        users.setName("RocketMQ");
        users.setRegisterTime(DateUtil.date());

        // 使用map作为消息类型
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put("type", type);
        paramMap.put("other", "这是其他信息");

        // 模拟场景：比如在用户注册、或者下单成功后，我们需要给用户发送一张券，并且发送短信通知用户
        // 关键点1：发券和通知都是异步处理
        // 关键点2：发券和通知都必须要成功。发券不成功的话，短信一定不要发送。发券成功的话，短信也一定要发送。（不考虑短信发送失败，这个是消费者落地后的操作）
        // 流程1：注册或者下单成功后，立即发送MQ消息，这个MQ消息就是告诉RocketMQ，我是一条事务消息，待确认消息，该消息RocketMQ不会去消费，而是处于等待过程中。
        // 流程2：流程1发送的这条MQ消息的本地事务（executeLocalTransaction），我们在这里进行发券操作，这个业务处理尽量在一个事务中处理，处理完成后，进行事务提交、或者回滚等操作
        // 如果流程2返回的是提交操作，那么流程1所发的待确认消息将会通知消费者进行消费，反之如果是回滚操作，则该消息将会被删除。
        // 如果流程2处理过程中，服务器挂了，或者其他任何不可控因素，导致流程2最后没有提交或者回滚操作。我们这里使用unknown状态来模拟服务器挂掉
        // 那么RocketMQ 将会调用checkLocalTransaction方法进行回查，回查频率为：1分钟回查一次，默认回查15次

        Message<Map<String, Object>> message = MessageBuilder.withPayload(paramMap).build();
        TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(MqConfig.Topic.topic_transaction_reward_coupon, message, users);
        log.info("事务消息发送结果：{}", transactionSendResult);
        return transactionSendResult;
    }

    /**
     * 批量发送消息
     *
     * @return java.lang.Object
     * @throws
     * @Date 2021/4/26 7:35 下午
     */
    @RequestMapping("/sendBatchMessage")
    public Object sendBatchMessage() {
        List<Message> messageList = new ArrayList<>();
        for (int i = 1; i <= 5; i++) {
            Users users1 = Users.builder().id(1).name("sunjs-1").registerTime(DateUtil.date()).build();
            Message<Users> message = MessageBuilder.withPayload(users1).build();
            messageList.add(message);
        }
        SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICBATCH, messageList);
        log.info("批量消息发送结果：{}", sendResult);

        return sendResult;
    }

    /**
     * RocketMQ 双主双从集群消息 宕机故障测试
     *
     * @Author: sun
     * @Date: 2021/4/28 8:39 下午
     */
    @RequestMapping("/cluster")
    public Object cluster() {
        SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TopicCluster, "RocketMQ双主双从测试");
        log.info("《集群生产者》发送结果：{}", sendResult);
        return sendResult;
    }

    /**
     * RocketMQ DLedger 宕机故障测试
     *
     * @Author: sun
     * @Date: 2021/4/28 8:39 下午
     */
    @RequestMapping("/dledger")
    public Object dledger() {
        SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TopicDledger, "测试broker自动切换");
        log.info("DLedger发送结果：{}", sendResult);
        return sendResult;
    }

}
